# Machine Learning Train and Test for Tweets Sentiment Analysis

## Environment Setup

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, Word2Vec
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import mlflow
import mlflow.spark
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec

In [0]:
df_balanced_tweets = spark.table("nlp_analysis.silver_layer.labeled_tweets")
display(df_balanced_tweets.limit(20))

label,text
0,user father dysfunct selfish drag kid dysfunct run
0,user user thank lyft credit use caus offer wheelchair van pdx disapoint getthank
0,bihday majesti
0,model love u take u time ur
0,factsguid societi motiv
0,huge fan fare big talk leav chao pay disput get allshowandnogo
0,user camp tomorrow user user user user user user user danni
0,next school year year exam think school exam hate imagin actorslif revolutionschool girl
0,love land allin cav champion cleveland clevelandcavali
0,user user welcom


In [0]:
df_test = spark.table("nlp_analysis.silver_layer.test_tweets")

## Machine Learning Pipeline

In [0]:
train_df, test_df = df_balanced_tweets.randomSplit([0.75, 0.25], seed=42)

In [0]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")

In [0]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

In [0]:
w2v = Word2Vec(vectorSize=100, minCount=1, inputCol="filtered_words", outputCol="features")

In [0]:
lr = LogisticRegression(featuresCol="features", labelCol="label")

In [0]:
pipeline = Pipeline(stages=[tokenizer, remover, w2v, lr])

## Model Training

In [0]:
model = pipeline.fit(train_df)

In [0]:
predictions = model.transform(test_df)

## Model Evaluation

In [0]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
accuracy = evaluator.evaluate(predictions)

In [0]:
print("AUC ROC no conjunto de teste:", accuracy)

AUC ROC no conjunto de teste: 0.9667104743225109


In [0]:
predictions.select("text", "label", "prediction", "probability").show(truncate=False)

+-----------------------------------------------------------------------------------------+-----+----------+------------------------------------------+
|text                                                                                     |label|prediction|probability                               |
+-----------------------------------------------------------------------------------------+-----+----------+------------------------------------------+
|                                                                                         |0    |0.0       |[0.6134628152876894,0.3865371847123106]   |
|                                                                                         |0    |0.0       |[0.6134628152876894,0.3865371847123106]   |
|                                                                                         |0    |0.0       |[0.6134628152876894,0.3865371847123106]   |
|                                                                                       

In [0]:
predictions = model.transform(df_test)

In [0]:
predictions.select("text", "prediction", "probability").show(truncate=False)

+-------------------------------------------------------------------------------------------------+----------+-----------------------------------------+
|text                                                                                             |prediction|probability                              |
+-------------------------------------------------------------------------------------------------+----------+-----------------------------------------+
|selfish orlando standwithorlando pulseshoot orlandoshoot biggerproblem selfish heabreak valu love|0.0       |[0.9169141684853241,0.08308583151467586] |
|yes call michelleobama gorilla racist long thought black peopl bet                               |1.0       |[0.419499949859492,0.580500050140508]    |
|smaller hand show barri probabl lie knick game suck golf                                         |0.0       |[0.7665095731067063,0.2334904268932937]  |
|user user point one finger user million point right back jewishsup               

## Model Fine-Tuning

In [0]:
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")

Exploratory Param Grid Builder

In [0]:
# paramGrid = ParamGridBuilder() \
#     .addGrid(w2v.vectorSize, [100, 200, 300]) \
#     .addGrid(w2v.windowSize, [3, 5, 7]) \
#     .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
#     .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
#     .build()

Better Param Grid Builder

In [0]:
paramGrid = ParamGridBuilder() \
    .addGrid(w2v.vectorSize, [100]) \
    .addGrid(w2v.windowSize, [5]) \
    .addGrid(lr.regParam, [0.01]) \
    .addGrid(lr.elasticNetParam, [0.0]) \
    .build()

In [0]:
cv = CrossValidator( \
    estimator=pipeline, \
    estimatorParamMaps=paramGrid, \
    evaluator=evaluator, \
    numFolds=3)

In [0]:
cv_model = cv.fit(train_df)

In [0]:
predictions = cv_model.transform(test_df)

In [0]:
accuracy = evaluator.evaluate(predictions)
print("AUC ROC no conjunto de teste:", accuracy)

AUC ROC no conjunto de teste: 0.8958838403838761


In [0]:
predictions.select("text", "prediction", "probability").show(truncate=False)

+-----------------------------------------------------------------------------------------+----------+------------------------------------------+
|text                                                                                     |prediction|probability                               |
+-----------------------------------------------------------------------------------------+----------+------------------------------------------+
|                                                                                         |0.0       |[0.5034571701115391,0.4965428298884609]   |
|                                                                                         |0.0       |[0.5034571701115391,0.4965428298884609]   |
|                                                                                         |0.0       |[0.5034571701115391,0.4965428298884609]   |
|                                                                                         |0.0       |[0.5034571701115391,0.

## Log and Save Model in Unity Catalog

In [0]:
input_schema = Schema([
    ColSpec("string", "text")
])

In [0]:
output_schema = Schema([
    ColSpec("double", "prediction")
])

In [0]:
signature = ModelSignature(inputs=input_schema, outputs=output_schema)

In [0]:
with mlflow.start_run(run_name="nlp_hate_speech_fine_tunned_model"):
    mlflow.spark.log_model(
        spark_model=cv_model.bestModel,
        artifact_path="hate_speech_model",
        registered_model_name="nlp_analysis.machine_learning_models.hate_speech_fine_tunned_model",
        signature=signature
    )

2025/10/07 17:16:08 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Downloading artifacts:   0%|          | 0/35 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Registered model 'nlp_analysis.machine_learning_models.hate_speech_fine_tunned_model' already exists. Creating a new version of this model...


Downloading artifacts:   0%|          | 0/39 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/39 [00:00<?, ?it/s]

Created version '3' of model 'nlp_analysis.machine_learning_models.hate_speech_fine_tunned_model'.
