In [1]:
# Ambiente (Java + PySpark + SparkSession)
# Fecha Spark anterior (se houver)
try:
    spark.stop()
except:
    pass

# Java + PySpark estáveis para Python 3.12
!apt-get update -qq
!apt-get install -y openjdk-17-jdk-headless -qq
!pip -q install -U pyspark==3.5.1

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["PATH"]  = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("eixo05-preprocess")
         .getOrCreate())
print("Spark OK ->", spark.version)


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Spark OK -> 3.5.1


In [2]:
# Drive e caminhos
from google.colab import drive
drive.mount('/content/drive', force_remount=False)

base_path = "/content/drive/MyDrive/Eixo_05/dados/"
csv_path  = base_path + "dataset.csv"
print("Base path:", base_path)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Base path: /content/drive/MyDrive/Eixo_05/dados/


In [3]:
# Imports e helpers mínimos
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, lower, regexp_replace, concat
from pyspark.ml.feature import (
    StringIndexer, RegexTokenizer, StopWordsRemover,
    HashingTF, IDF, Word2Vec, MinMaxScaler, NGram, CountVectorizer
)

def limpar_texto(df: DataFrame, coluna="review"):
    df = df.withColumn(coluna, regexp_replace(col(coluna), r"<[^>]+>", ""))     # remove tags simples
    df = df.withColumn(coluna, regexp_replace(col(coluna), r"[^A-Za-z ]+", " "))# só letras/espaço
    df = df.withColumn(coluna, regexp_replace(col(coluna), r"\s+", " "))        # colapsa espaços
    df = df.withColumn(coluna, lower(col(coluna)))
    return df


In [4]:
# Carregar CSV e etapas comuns (dropna, label, limpeza, tokens)

# 1) Ler e manter apenas colunas usadas
reviews = spark.read.csv(csv_path, header=True, escape="\"").select("sentiment", "review")

# 2) Remover nulos essenciais
reviews = reviews.na.drop(subset=["sentiment", "review"])

# 3) Label: sentiment -> label (pula valores inválidos sem travar)
indexer = StringIndexer(inputCol="sentiment", outputCol="label", handleInvalid="skip")
df = indexer.fit(reviews).transform(reviews)

# 4) Limpeza de texto + tokenização + stopwords
df = limpar_texto(df, coluna="review")
df = RegexTokenizer(inputCol="review", outputCol="words", pattern=r"\W+").transform(df)
df = StopWordsRemover(inputCol="words", outputCol="filtered", caseSensitive=False).transform(df)


In [5]:
# Featurização HTF + TF-IDF

# === HTF (unigramas com hashing) ===
htf = HashingTF(inputCol="filtered", outputCol="rawfeatures", numFeatures=1<<18)  # opcional: 1<<18
htf_df = htf.transform(df)
HTFfeaturizedData = (
    htf_df.select("sentiment", "review", "label", "rawfeatures")
          .withColumnRenamed("rawfeatures", "features")
)

# === TF-IDF (unigramas + bigramas com vocabulário) ===
# bigramas
ngram = NGram(n=2, inputCol="filtered", outputCol="bigrams")
df_ng = ngram.transform(df)

# concatena unigrams + bigrams
df_tokens = df_ng.withColumn("tokens_12", concat("filtered", "bigrams"))

# CountVectorizer com minDF=2 (similar ao sklearn)
cv = CountVectorizer(inputCol="tokens_12", outputCol="rawfeatures", minDF=2, vocabSize=1<<18)
cv_model = cv.fit(df_tokens)
cv_df = cv_model.transform(df_tokens)

# IDF
idf = IDF(inputCol="rawfeatures", outputCol="features")
idf_model = idf.fit(cv_df)
TFIDFfeaturizedData = idf_model.transform(cv_df) \
    .select("sentiment", "review", "label", "features")



In [6]:
# Featurização Word2Vec (+ escala)
w2v = Word2Vec(inputCol="filtered", outputCol="features", vectorSize=250, minCount=5, seed=42)
w2v_df = w2v.fit(df).transform(df)

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scaled = scaler.fit(w2v_df).transform(w2v_df)

W2VfeaturizedData = (
    scaled.select("sentiment", "review", "label", "scaledFeatures")
          .withColumnRenamed("scaledFeatures", "features")
)


In [7]:
# Salvar Parquet + retornar DFs (com nomes)

# Salva no Drive
HTFfeaturizedData.write.mode("overwrite").parquet(base_path + "HTFfeaturizedData")
TFIDFfeaturizedData.select("sentiment","review","label","features") \
    .write.mode("overwrite").parquet(base_path + "TFIDFfeaturizedData")
W2VfeaturizedData.write.mode("overwrite").parquet(base_path + "W2VfeaturizedData")

# Nomes amigáveis (usados depois no treinamento)
HTFfeaturizedData.name   = "HTFfeaturizedData"
TFIDFfeaturizedData.name = "TFIDFfeaturizedData"
W2VfeaturizedData.name   = "W2VfeaturizedData"

print("Salvo em:")
print(f" - {base_path}HTFfeaturizedData")
print(f" - {base_path}TFIDFfeaturizedData")
print(f" - {base_path}W2VfeaturizedData")


Salvo em:
 - /content/drive/MyDrive/Eixo_05/dados/HTFfeaturizedData
 - /content/drive/MyDrive/Eixo_05/dados/TFIDFfeaturizedData
 - /content/drive/MyDrive/Eixo_05/dados/W2VfeaturizedData


In [8]:
# Sanity check rápido

print("Contagens:",
      HTFfeaturizedData.count(),
      TFIDFfeaturizedData.count(),
      W2VfeaturizedData.count())


Contagens: 50000 50000 50000
