In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ProcesarTokens") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

/usr/local/lib/python3.10/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/09 19:13:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/09 19:13:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
df = spark.read.parquet("*.parquet")

In [3]:
df.show(10)

                                                                                

+--------------+--------------------+--------------------+--------------------+-----------------+-----------------+--------------------+--------------------+--------------------+-----------+
|nombre_archivo|             content|               title|              author|authoryearofbirth|authoryearofdeath|            subjects|              tokens|     filtered_tokens|token_count|
+--------------+--------------------+--------------------+--------------------+-----------------+-----------------+--------------------+--------------------+--------------------+-----------+
|       PG42023|\nLA WEB, UNA ENC...|La web, una encic...|       Lebert, Marie|              NaN|              NaN|  {'World Wide Web'}|[la, web, una, en...|[web, enciclopedi...|      20791|
|       PG41575|\n  JUVENILIA\n\n...|Juvenilla; Prosa ...|        Cané, Miguel|           1851.0|           1905.0|{'Argentine liter...|[juvenilia, prosa...|[juvenilia, prosa...|      79537|
|       PG41746|\n\n\n\n\n\n\n\n\...|        

In [4]:
from pyspark.sql.functions import max

# Obtener el mayor valor de la columna 'token_count'
max_token_count = df.agg(max("token_count")).collect()[0][0]

print("Máximo token_count:", max_token_count)

Máximo token_count: 354168


In [5]:
# Total de libros: 132
# Cantidad total de tokens: 7,496,845
# Libro con mayor cantidad de tokens: 354,168 - Don Quijote

In [6]:
# Usando filter
libro_max_token_count = df.filter(df.token_count == 354168)

# Mostrar el resultado
libro_max_token_count.show()

+--------------+--------------------+-----------+--------------------+-----------------+-----------------+--------------------+--------------------+--------------------+-----------+
|nombre_archivo|             content|      title|              author|authoryearofbirth|authoryearofdeath|            subjects|              tokens|     filtered_tokens|token_count|
+--------------+--------------------+-----------+--------------------+-----------------+-----------------+--------------------+--------------------+--------------------+-----------+
|        PG2000|\n\n\n\nEl ingeni...|Don Quijote|Cervantes Saavedr...|           1547.0|           1616.0|{'Romances', 'Kni...|[el, ingenioso, h...|[ingenioso, hidal...|     354168|
+--------------+--------------------+-----------+--------------------+-----------------+-----------------+--------------------+--------------------+--------------------+-----------+



In [7]:
from pyspark.sql.functions import explode, col, desc

# Suponiendo que `filtered_tokens` es una columna de tipo array<string>
top_palabras = (
    df.select(explode(col("filtered_tokens")).alias("palabra"))
      .groupBy("palabra")
      .count()
      .orderBy(desc("count"))
      .limit(50)
)

top_palabras.show(50)



+-------+-----+
|palabra|count|
+-------+-----+
|     si|24715|
|    tan|13446|
|    dos|12172|
|   dijo|11839|
|    ser|11677|
|   bien|10877|
|    don|10544|
|    mas|10517|
|  usted| 9939|
|  mismo| 9233|
|   pues| 9075|
|   casa| 8970|
|    así| 8547|
| tiempo| 8315|
|después| 8063|
| hombre| 8050|
|     to| 7900|
|    vez| 7838|
|  aquel| 7444|
|  todas| 7369|
|   vida| 7333|
|  señor| 6888|
|   gran| 6579|
|aquella| 6548|
|siempre| 6449|
|   ojos| 6430|
|    fué| 6352|
|  puede| 6337|
|  parte| 6335|
|   dios| 6291|
|    the| 6276|
|   sino| 6238|
|   toda| 6223|
|   aquí| 6215|
|  hacer| 5882|
|    tal| 5868|
| aunque| 5715|
|  menos| 5513|
|    día| 5494|
|   allí| 5428|
|   años| 5399|
|    ver| 5338|
|   tres| 5251|
|   cada| 5209|
|  noche| 5161|
|  mundo| 5122|
|  ahora| 5108|
|    fin| 4981|
|  veces| 4974|
|  luego| 4964|
+-------+-----+



                                                                                

In [8]:
from pyspark.sql.functions import array_contains

# Filtrar libros que contienen la palabra "señor"
libros_filtrados = df.filter(array_contains(df.filtered_tokens, "paraguay"))

# Mostrar resultados
libros_filtrados.select("title").show(20, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title                                                                                                                                                                    |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Juvenilla; Prosa ligera                                                                                                                                                  |
|La situacion de Puerto-Rico: Las falacias de los conservadores y los compromisos del partido radical                                                                     |
|Descripción colonial, libro segundo (2/2)                                                                                                  

In [9]:
titulo_buscado = "El aceite de olivas : $b su extracción, clarificación, depuración, conservación y envases para su exportación, decoloración y medios propuestos para quitarle la rancidez"

libro = df.filter(df.title == titulo_buscado)

libro.select(explode(col("filtered_tokens")).alias("palabra")).show(100, truncate=False)

+-------------+
|palabra      |
+-------------+
|biblioteca   |
|agricultor   |
|volumen      |
|iv           |
|aceite       |
|olivas       |
|extracción   |
|clarificación|
|depuración   |
|conservación |
|envases      |
|exportación  |
|decoloración |
|medios       |
|propuestos   |
|quitarle     |
|rancidez     |
|dr           |
|bizzarri     |
|segunda      |
|edición      |
|traducida    |
|diego        |
|pequeño      |
|catedrático  |
|numerario    |
|instituto    |
|agrícola     |
|alfonso      |
|xii          |
|ex           |
|director     |
|mismo        |
|illustration |
|madrid       |
|librería     |
|agrícola     |
|casa         |
|editorial    |
|calle        |
|serrano      |
|núm          |
|aceite       |
|olivas       |
|extracción   |
|clarificación|
|depuración   |
|conservación |
|envases      |
|exportación  |
|decoloración |
|medios       |
|propuestos   |
|quitarle     |
|rancidez     |
|dr           |
|alejandro    |
|bizzarri     |
|segunda      |
|edición

In [10]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml import Pipeline

# Suponiendo que `df` tiene una columna 'clean_text' que contiene el texto limpio
# Tokenizar el texto
tokenizer = Tokenizer(inputCol="content", outputCol="tokens_tf")

# Usar HashingTF para obtener el conteo de términos (similar a la frecuencia de términos en el vectorizador)
hashing_tf = HashingTF(inputCol="tokens_tf", outputCol="raw_features", numFeatures=5000)  # 5000 características como max_features

# Calcular IDF
idf = IDF(inputCol="raw_features", outputCol="features")

# Crear el pipeline con las tres etapas: tokenización, HashingTF e IDF
pipeline = Pipeline(stages=[tokenizer, hashing_tf, idf])

# Ajustar el pipeline
model = pipeline.fit(df)

# Transformar los datos
df_tfidf = model.transform(df)

# Mostrar los resultados
# df_tfidf.select("content", "features").show(truncate=False)


                                                                                

In [11]:
df_tfidf.select("content", "features").show()

                                                                                

+--------------------+--------------------+
|             content|            features|
+--------------------+--------------------+
|\nLA WEB, UNA ENC...|(5000,[1,4,5,10,1...|
|\n  JUVENILIA\n\n...|(5000,[0,1,2,3,4,...|
|\n\n\n\n\n\n\n\n\...|(5000,[0,1,2,3,4,...|
|\n[imagen: MUSEO ...|(5000,[0,1,2,3,4,...|
|\n  Nota del Tran...|(5000,[0,1,2,3,4,...|
|\nNotas del Trans...|(5000,[0,1,2,3,4,...|
|produced from ima...|(5000,[0,1,2,3,4,...|
|\nEn esta edición...|(5000,[0,1,2,3,4,...|
|\n  Nota del Tran...|(5000,[0,1,2,5,7,...|
|\n  Nota del Tran...|(5000,[0,2,3,4,7,...|
|\n               ...|(5000,[1,2,4,8,9,...|
|\n  En esta edici...|(5000,[0,1,2,3,4,...|
|\n\n\n\n\n\n\n\n\...|(5000,[0,2,3,4,5,...|
|\nNotas del Trans...|(5000,[0,1,2,3,4,...|
|\n  Nota del Tran...|(5000,[0,2,3,4,5,...|
|produced from ima...|(5000,[0,1,2,3,4,...|
|\n  [Nota del tra...|(5000,[0,1,2,4,7,...|
|\nNota del transc...|(5000,[0,1,2,3,4,...|
|Proofreading Team...|(5000,[0,1,2,3,4,...|
|produced from ima...|(5000,[0,1

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Suponiendo que ya tienes la columna 'features' generada en el paso anterior con TF-IDF

# Especificamos el número de clusters
num_clusters = 20

# Configuramos el modelo KMeans en Spark
kmeans = KMeans().setK(num_clusters).setSeed(42).setFeaturesCol("features").setPredictionCol("cluster_kmeans")

# Entrenamos el modelo KMeans
kmeans_model = kmeans.fit(df_tfidf)

# Realizamos las predicciones (asignación de cluster)
df_clusters = kmeans_model.transform(df_tfidf)

# Mostrar los resultados
df_clusters.select("content", "cluster_kmeans").show()


25/04/09 19:14:36 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                