<a href="https://colab.research.google.com/github/VSennaa/text-classification-bigdata-agnews-spark/blob/main/text_classification_bigdata_agnews.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

https://huggingface.co/datasets/sh0416/ag_news

In [None]:
## Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, concat_ws
from pyspark.ml.classification import LogisticRegression, NaiveBayes, DecisionTreeClassifier, RandomForestClassifier, LinearSVC
from pyspark.ml.feature import Tokenizer, StopWordsRemover, RegexTokenizer, HashingTF, IDF
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
## Keras TensorFlow
import tensorflow as tf
from tensorflow import keras
from keras import layers
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
## OutrosZ
import pandas as pd
import numpy as np
import os
import time

inicio = time.time()


Inicizalizando o Spark


In [None]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Aquisição do dataset

In [None]:
!wget 'https://raw.githubusercontent.com/VSennaa/text-classification-bigdata-agnews-spark/refs/heads/main/train.jsonl'
!wget 'https://raw.githubusercontent.com/VSennaa/text-classification-bigdata-agnews-spark/refs/heads/main/test.jsonl'

--2025-08-06 18:37:35--  https://raw.githubusercontent.com/VSennaa/text-classification-bigdata-agnews-spark/refs/heads/main/train.jsonl
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 33697315 (32M) [text/plain]
Saving to: ‘train.jsonl’


2025-08-06 18:37:36 (301 MB/s) - ‘train.jsonl’ saved [33697315/33697315]

--2025-08-06 18:37:36--  https://raw.githubusercontent.com/VSennaa/text-classification-bigdata-agnews-spark/refs/heads/main/test.jsonl
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2125097 (2.0M) [text/plain]
Saving to: ‘

In [None]:
df_train = spark.read.json("/content/train.jsonl")
df_test = spark.read.json("/content/test.jsonl")

In [None]:
df_train.show()

+--------------------+-----+--------------------+
|         description|label|               title|
+--------------------+-----+--------------------+
|Reuters - Short-s...|    3|Wall St. Bears Cl...|
|Reuters - Private...|    3|Carlyle Looks Tow...|
|Reuters - Soaring...|    3|Oil and Economy C...|
|Reuters - Authori...|    3|Iraq Halts Oil Ex...|
|AFP - Tearaway wo...|    3|Oil prices soar t...|
|Reuters - Stocks ...|    3|Stocks End Up, Bu...|
|AP - Assets of th...|    3|Money Funds Fell ...|
|USATODAY.com - Re...|    3|Fed minutes show ...|
|Forbes.com - Afte...|    3|Safety Net (Forbe...|
| NEW YORK (Reuter...|    3|Wall St. Bears Cl...|
| NEW YORK (Reuter...|    3|Oil and Economy C...|
| TEHRAN (Reuters)...|    3|No Need for OPEC ...|
| JAKARTA (Reuters...|    3|Non-OPEC Nations ...|
| WASHINGTON/NEW Y...|    3|Google IPO Auctio...|
| NEW YORK (Reuter...|    3|Dollar Falls Broa...|
|If you think you ...|    3|Rescuing an Old S...|
|The purchasing po...|    3|Kids Rule for Bac...|


In [None]:
df_train.select("description").show()

+--------------------+
|         description|
+--------------------+
|Reuters - Short-s...|
|Reuters - Private...|
|Reuters - Soaring...|
|Reuters - Authori...|
|AFP - Tearaway wo...|
|Reuters - Stocks ...|
|AP - Assets of th...|
|USATODAY.com - Re...|
|Forbes.com - Afte...|
| NEW YORK (Reuter...|
| NEW YORK (Reuter...|
| TEHRAN (Reuters)...|
| JAKARTA (Reuters...|
| WASHINGTON/NEW Y...|
| NEW YORK (Reuter...|
|If you think you ...|
|The purchasing po...|
|There is little c...|
|The US trade defi...|
|Oil giant Shell c...|
+--------------------+
only showing top 20 rows



Verificando os caracteres para garantir que não haja necessidade de limpeza

In [None]:
unique_chars = set()
columns_to_check = ["description", "title"]
dataframes = {'train': df_train, 'test': df_test}

for split_name, dataframe in dataframes.items():
    for column_name in columns_to_check:
        chars_df = dataframe.select(split(col(column_name), "").alias("chars"))
        exploded_chars_df = chars_df.select(explode(col("chars")).alias("char"))
        collected_chars = exploded_chars_df.select("char").distinct().collect()
        for row in collected_chars:
            unique_chars.add(row.char)

print("".join(sorted(list(unique_chars))))

 !"#$&'()*,-./0123456789:;=?ABCDEFGHIJKLMNOPQRSTUVWXYZ\_abcdefghijklmnopqrstuvwxyz


Tokenização

In [None]:
#adicionar coluna text
df_train = df_train.withColumn("text", concat_ws(" ", "title", "description"))
df_test = df_test.withColumn("text", concat_ws(" ", "title", "description"))
df_train.describe().show()

+-------+--------------------+-----------------+--------------------+--------------------+
|summary|         description|            label|               title|                text|
+-------+--------------------+-----------------+--------------------+--------------------+
|  count|              120000|           120000|              120000|              120000|
|   mean|                NULL|              2.5|                NULL|                NULL|
| stddev|                NULL|1.118038647253962|                NULL|                NULL|
|    min|    1 Southern Ca...|                1| #147;Generic Sup...| #147;Generic Sup...|
|    max|you know, writing...|                4|worm has turned f...|worm has turned f...|
+-------+--------------------+-----------------+--------------------+--------------------+



In [None]:
tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+")
tokenized_df = tokenizer.transform(df_train)
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
filtered_df = stopwords_remover.transform(tokenized_df)

In [None]:
hashing = HashingTF(
    inputCol="filtered_tokens",
    outputCol="features",
    numFeatures=10000
)
features_df = hashing.transform(filtered_df)

In [None]:
idf = IDF(inputCol="features", outputCol="idf_features")
idf_model = idf.fit(features_df)
idf_df = idf_model.transform(features_df)
idf_df.show()

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|         description|label|               title|                text|              tokens|     filtered_tokens|            features|        idf_features|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Reuters - Short-s...|    3|Wall St. Bears Cl...|Wall St. Bears Cl...|[wall, st, bears,...|[wall, st, bears,...|(10000,[551,662,1...|(10000,[551,662,1...|
|Reuters - Private...|    3|Carlyle Looks Tow...|Carlyle Looks Tow...|[carlyle, looks, ...|[carlyle, looks, ...|(10000,[157,1030,...|(10000,[157,1030,...|
|Reuters - Soaring...|    3|Oil and Economy C...|Oil and Economy C...|[oil, and, econom...|[oil, economy, cl...|(10000,[217,532,6...|(10000,[217,532,6...|
|Reuters - Authori...|    3|Iraq Halts Oil Ex...|Iraq Halts Oil Ex...|

### MLLIB

In [None]:
lr = LogisticRegression(featuresCol="idf_features", labelCol="label")
nv = NaiveBayes(featuresCol="idf_features", labelCol="label")
dt = DecisionTreeClassifier(featuresCol="idf_features", labelCol="label")
rf = RandomForestClassifier(featuresCol="idf_features", labelCol="label")
svm = LinearSVC(featuresCol="idf_features", labelCol="label")

Montar o Pipeline

In [None]:
models = [lr, nv, dt, rf]

 # Laço de treino e teste
acuracias = {}
for model in models:
  print(f"Training {type(model).__name__} model...")
  pipeline = Pipeline(stages=[tokenizer,
                              stopwords_remover,
                              hashing,
                              idf,
                              model
                              ])
  pipeline_model = pipeline.fit(df_train)
  predictions_df = pipeline_model.transform(df_test)
  evaluator = MulticlassClassificationEvaluator(
      labelCol="label",
      predictionCol="prediction",
      metricName="accuracy"
  )
  acuracias[type(model).__name__] = evaluator.evaluate(predictions_df)
  print(f"{type(model).__name__} model accuracy: {acuracias[type(model).__name__]}")

Training LogisticRegression model...
LogisticRegression model accuracy: 0.8226315789473684
Training NaiveBayes model...
NaiveBayes model accuracy: 0.038684210526315786
Training DecisionTreeClassifier model...
DecisionTreeClassifier model accuracy: 0.36276315789473684
Training RandomForestClassifier model...
RandomForestClassifier model accuracy: 0.6332894736842105


### Resultados
Eu prefiro a apresentação do pandas, então passei pra um dataframe dele

In [None]:
pddf_acc = pd.DataFrame(acuracias.items(), columns=['Modelo', 'Acuracia'])
pddf_acc

Unnamed: 0,Modelo,Acuracia
0,LogisticRegression,0.822632
1,NaiveBayes,0.038684
2,DecisionTreeClassifier,0.362763
3,RandomForestClassifier,0.633289


### DeepLearning

In [None]:
raw_keras_train = df_train.select("text").toPandas().to_dict()
raw_keras_test = df_test.select("text").toPandas().to_dict()

In [None]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(raw_keras_train['text'].values())

train_sequences = tokenizer.texts_to_sequences(raw_keras_train['text'].values())
test_sequences = tokenizer.texts_to_sequences(raw_keras_test['text'].values())

pad_train_sequences = pad_sequences(train_sequences, maxlen=100)
pad_test_sequences = pad_sequences(test_sequences, maxlen=100)

In [None]:
pad_train_sequences

array([[    0,     0,     0, ...,  4049,   797,   332],
       [    0,     0,     0, ...,     4,     1,   128],
       [    0,     0,     0, ...,     1,  1214, 14993],
       ...,
       [    0,     0,     0, ...,   346,    65,   123],
       [    0,     0,     0, ...,    42,    16,  1666],
       [    0,     0,     0, ...,  2095,  3435,    72]], dtype=int32)

In [None]:
labels = df_train.select("label").toPandas().to_numpy()
y_train = tf.keras.utils.to_categorical(labels - 1, num_classes=4)
y_train

array([[0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.],
       ...,
       [0., 1., 0., 0.],
       [0., 1., 0., 0.],
       [0., 1., 0., 0.]])

In [None]:
model = keras.models.Sequential()
model.add(layers.Embedding(input_dim=len(tokenizer.word_index) + 1, output_dim=128))
model.add(keras.layers.LSTM(64))
model.add(layers.Dense(16, activation='relu'))
model.add(layers.Dense(4, activation='softmax'))
model.summary()

In [None]:
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])


In [None]:
history = model.fit(pad_train_sequences,
                    labels - 1,
                    epochs=30,
                    batch_size=32,
                    validation_split=0.2)

Epoch 1/30
[1m3000/3000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 12ms/step - accuracy: 0.8039 - loss: 0.5043 - val_accuracy: 0.9033 - val_loss: 0.2870
Epoch 2/30
[1m3000/3000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 12ms/step - accuracy: 0.9429 - loss: 0.1846 - val_accuracy: 0.9033 - val_loss: 0.2924
Epoch 3/30
[1m3000/3000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 12ms/step - accuracy: 0.9605 - loss: 0.1152 - val_accuracy: 0.9007 - val_loss: 0.3305
Epoch 4/30
[1m3000/3000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 12ms/step - accuracy: 0.9766 - loss: 0.0665 - val_accuracy: 0.8969 - val_loss: 0.4043
Epoch 5/30
[1m3000/3000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 12ms/step - accuracy: 0.9850 - loss: 0.0427 - val_accuracy: 0.8946 - val_loss: 0.4562
Epoch 6/30
[1m3000/3000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 12ms/step - accuracy: 0.9912 - loss: 0.0263 - val_accuracy: 0.8917 - val_loss: 0.4904
Epoc

In [None]:
test_labels = df_test.select("label").toPandas().to_numpy()
y_test = test_labels - 1
loss, accuracy = model.evaluate(pad_test_sequences, y_test)
print(f"Loss do Teste: {loss}")
print(f"Acurácia do Teste: {accuracy}")

[1m238/238[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 6ms/step - accuracy: 0.8914 - loss: 0.8545
Loss do Teste: 0.8063768744468689
Acurácia do Teste: 0.8936842083930969


In [None]:
# adicionando o lstm ao datafrmae
pddf_acc.loc[len(pddf_acc)] = ['LSTM', accuracy]
pddf_acc

Unnamed: 0,Modelo,Acuracia
0,LogisticRegression,0.822632
1,NaiveBayes,0.038684
2,DecisionTreeClassifier,0.362763
3,RandomForestClassifier,0.633289
4,LSTM,0.893684


O melhor modeo foi o lstm com o keras, agora vou otimizar os hiperparametros
## Otimizaçao de hiperparametros

In [None]:
import keras_tuner as kt

def build_lstm_model(hp):
    model = keras.models.Sequential()
    model.add(layers.Embedding(input_dim=len(tokenizer.word_index) + 1, output_dim=hp.Int('embedding_output_dim', min_value=32, max_value=256, step=32)))
    model.add(keras.layers.LSTM(units=hp.Int('lstm_units', min_value=32, max_value=128, step=32)))
    model.add(layers.Dense(units=hp.Int('dense_units', min_value=16, max_value=64, step=16), activation=hp.Choice('dense_activation', values=['relu', 'tanh'])))
    model.add(layers.Dense(4, activation='softmax'))

    model.compile(optimizer=keras.optimizers.Adam(hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling='log')),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    return model

tuner = kt.Hyperband(
    build_lstm_model,
    objective='val_accuracy',
    max_epochs=10,
    factor=3,
    directory='my_dir',
    project_name='intro_to_kt')

stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)

tuner.search(pad_train_sequences, labels - 1, epochs=50, validation_split=0.2, callbacks=[stop_early])

best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]

print(f"""
Melhor embedding_output_dim is {best_hps.get('embedding_output_dim')}
Melhor lstm_units is {best_hps.get('lstm_units')}
Melhor dense_units is {best_hps.get('dense_units')}
Melhor dense_activation is {best_hps.get('dense_activation')}
Melhor learning_rate is {best_hps.get('learning_rate')}
""")

fim = time.time()
print(print(f"Tempo de execução{(fim - inicio)/(60*60)}"))


ModuleNotFoundError: No module named 'keras_tuner'