In [1]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.sql import functions as F
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

In [2]:
spark = (SparkSession.builder
        .appName("Yelp Model")
        .config("spark.executor.memory", "8g")
        .config("spark.driver.memory", "8g")
        .config("spark.sql.shuffle.partitions", "10")
        .master("local[*]")
        .getOrCreate())

print(spark.version) 


# usamos parquet de silver
df = spark.read.parquet("../data/silver/")
df.show(5, truncate=False)
df.printSchema()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/25 20:01:18 WARN Utils: Your hostname, MacBook-Pro-de-johar.local, resolves to a loopback address: 127.0.0.1; using 10.100.238.85 instead (on interface en0)
25/06/25 20:01:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/25 20:01:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


4.0.0


                                                                                

+----------------------+----------------------+----------------------+------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

25/06/25 21:08:32 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1047770 ms exceeds timeout 120000 ms
25/06/25 21:08:32 WARN SparkContext: Killing executors is not supported by current scheduler.
25/06/25 21:17:46 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

In [None]:
# Eliminar cualquier columna conflictiva antes de aplicar el pipeline
df = df.drop("words_tokenized_final", "filtered_words_final", "rawFeatures_final", "features_final", "label_final")

# Tokenización con nombres completamente únicos
tokenizer = Tokenizer(inputCol="text", outputCol="words_tokenized_final")
df = tokenizer.transform(df)

# Eliminar stopwords
stopwords_remover = StopWordsRemover(inputCol="words_tokenized_final", outputCol="filtered_words_final")
df = stopwords_remover.transform(df)

# Transformar las palabras a minúsculas
df = df.withColumn("filtered_words_final", F.expr("transform(filtered_words_final, x -> lower(x))"))

# HashingTF con nombres de columnas completamente únicos
hashingTF = HashingTF(inputCol="filtered_words_final", outputCol="rawFeatures_final", numFeatures=1000)
df = hashingTF.transform(df)

# IDF con un nombre único para la columna de salida
idf = IDF(inputCol="rawFeatures_final", outputCol="features_final")
df = idf.fit(df).transform(df)

# Indexar las estrellas de la reseña
labelIndexer = StringIndexer(inputCol="review_stars", outputCol="label_final")
df = labelIndexer.fit(df).transform(df)

# Regresión logística con nombres únicos para las columnas
lr = LogisticRegression(featuresCol="features_final", labelCol="label_final", maxIter=10, regParam=0.1, elasticNetParam=0.8)
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, hashingTF, idf, labelIndexer, lr])

# Entrenar el modelo
# model = pipeline.fit(df)

                                                                                

IllegalArgumentException: Output column words_tokenized_final already exists.

In [None]:
spark.stop()

# Pre procesamiento

# Train