In [1]:
from pyspark.sql import SparkSession

In [12]:
datos = [("libro1","HELLO WORL! This is our  first @ document..."),
         ("libro2","I want to starT learnign something about me."),
         ("libro3","This is gonna be a book about monsters and teenagers!!!")]
schema = ['nombre','contenido']

In [13]:
spark = SparkSession.builder.appName("Dataframes") \
        .master('local[*]') \
        .getOrCreate()


In [14]:
df = spark.createDataFrame(datos, schema)

In [15]:
df.show()

+------+--------------------+
|nombre|           contenido|
+------+--------------------+
|libro1|HELLO WORL! This ...|
|libro2|I want to starT l...|
|libro3|This is gonna be ...|
+------+--------------------+



In [16]:
df.show(truncate=False)

+------+-------------------------------------------------------+
|nombre|contenido                                              |
+------+-------------------------------------------------------+
|libro1|HELLO WORL! This is our  first @ document...           |
|libro2|I want to starT learnign something about me.           |
|libro3|This is gonna be a book about monsters and teenagers!!!|
+------+-------------------------------------------------------+



In [17]:
from pyspark.sql.functions import split, lower, explode

In [18]:
df = df.withColumn('contenido', lower(df.contenido))

In [19]:
df.show(truncate=False)

+------+-------------------------------------------------------+
|nombre|contenido                                              |
+------+-------------------------------------------------------+
|libro1|hello worl! this is our  first @ document...           |
|libro2|i want to start learnign something about me.           |
|libro3|this is gonna be a book about monsters and teenagers!!!|
+------+-------------------------------------------------------+



In [20]:
from pyspark.sql.functions import *


In [21]:
df_filtrado = df.withColumn('contenido', regexp_replace('contenido', '[^a-zA-Z0-9-\\s]',''))

In [22]:
df_filtrado.show(truncate=False)

+------+----------------------------------------------------+
|nombre|contenido                                           |
+------+----------------------------------------------------+
|libro1|hello worl this is our  first  document             |
|libro2|i want to start learnign something about me         |
|libro3|this is gonna be a book about monsters and teenagers|
+------+----------------------------------------------------+



In [23]:
from pyspark.sql.functions import split, lower, explode

In [14]:
df.select( explode( split( lower(df.contenido), " ") ).alias("palabras") ).show()

+-----------+
|   palabras|
+-----------+
|      hello|
|      worl!|
|       this|
|         is|
|        our|
|           |
|      first|
|document...|
|          i|
|       want|
|         to|
|      start|
|   learnign|
|  something|
|      about|
|        me.|
|       this|
|         is|
|      gonna|
|         be|
+-----------+
only showing top 20 rows


In [15]:
df_filtrado.select( explode( split(df_filtrado.contenido, " ") ).alias("palabras") ).show()

+---------+
| palabras|
+---------+
|    hello|
|     worl|
|     this|
|       is|
|      our|
|         |
|    first|
| document|
|        i|
|     want|
|       to|
|    start|
| learnign|
|something|
|    about|
|       me|
|     this|
|       is|
|    gonna|
|       be|
+---------+
only showing top 20 rows


In [24]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

[nltk_data] Downloading package stopwords to /home/jenoe/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [25]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

[nltk_data] Downloading package stopwords to /home/jenoe/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [1]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords


ModuleNotFoundError: No module named 'nltk'

In [1]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

ModuleNotFoundError: No module named 'nltk'

In [1]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

[nltk_data] Downloading package stopwords to /home/jenoe/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [26]:
filterWords = stopwords.words('english')

In [27]:
import pyspark.sql.functions as sf
df_filtrado = df_filtrado.withColumn("contenido",sf.regexp_replace("contenido","\\s+"," "))

In [28]:
df_filtrado.show(truncate=False)

                                                                                

+------+----------------------------------------------------+
|nombre|contenido                                           |
+------+----------------------------------------------------+
|libro1|hello worl this is our first document               |
|libro2|i want to start learnign something about me         |
|libro3|this is gonna be a book about monsters and teenagers|
+------+----------------------------------------------------+



In [29]:
df_filtrado = df_filtrado.withColumn("tokens",sf.split(sf.col("contenido"), " "))

In [30]:
df_filtrado.show(truncate=False)


+------+----------------------------------------------------+---------------------------------------------------------------+
|nombre|contenido                                           |tokens                                                         |
+------+----------------------------------------------------+---------------------------------------------------------------+
|libro1|hello worl this is our first document               |[hello, worl, this, is, our, first, document]                  |
|libro2|i want to start learnign something about me         |[i, want, to, start, learnign, something, about, me]           |
|libro3|this is gonna be a book about monsters and teenagers|[this, is, gonna, be, a, book, about, monsters, and, teenagers]|
+------+----------------------------------------------------+---------------------------------------------------------------+



In [19]:
df_filtrado = df_filtrado

In [31]:
filterWordsCol = sf.lit(filterWords)
df_filtrado = df_filtrado.withColumn("tokens",sf.filter(sf.col("tokens"), 
                                                             lambda x: ~sf.array_contains
                                                             (filterWordsCol, x)))

In [32]:
df_filtrado.show(truncate=False)

                                                                                

+------+----------------------------------------------------+----------------------------------+
|nombre|contenido                                           |tokens                            |
+------+----------------------------------------------------+----------------------------------+
|libro1|hello worl this is our first document               |[hello, worl, first, document]    |
|libro2|i want to start learnign something about me         |[want, start, learnign, something]|
|libro3|this is gonna be a book about monsters and teenagers|[gonna, book, monsters, teenagers]|
+------+----------------------------------------------------+----------------------------------+



In [33]:
df_filtrado = df_filtrado.select("id",sf.explode("tokens").alias("words"))

25/12/05 21:03:43 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
{"ts": "2025-12-05 21:03:43.946", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `id` cannot be resolved. Did you mean one of the following? [`words`, `nombre`, `tokens`, `contenido`]. SQLSTATE: 42703", "context": {"file": "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "line": "", "fragment": "col", "errorClass": "UNRESOLVED_COLUMN.WITH_SUGGESTION"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o315.select.\n: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `id` cannot be resolved. Did you mean one of the following? [`words`, `nombre`, `tokens`, `

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `id` cannot be resolved. Did you mean one of the following? [`words`, `nombre`, `tokens`, `contenido`]. SQLSTATE: 42703;
'Project ['id, words#89]
+- Generate explode(tokens#76), false, [words#89]
   +- Project [nombre#25, contenido#57, filter(tokens#65, lambdafunction(NOT array_contains(array(a, about, above, after, again, against, ain, all, am, an, and, any, are, aren, aren't, as, at, be, because, been, before, being, below, between, both, ... 173 more fields), lambda x_1#77), lambda x_1#77, false)) AS tokens#76]
      +- Project [nombre#25, contenido#57, split(contenido#57,  , -1) AS tokens#65]
         +- Project [nombre#25, regexp_replace(contenido#49, \s+,  , 1) AS contenido#57]
            +- Project [nombre#25, regexp_replace(contenido#41, [^a-zA-Z0-9-\s], , 1) AS contenido#49]
               +- Project [nombre#25, lower(contenido#26) AS contenido#41]
                  +- LogicalRDD [nombre#25, contenido#26], false


In [36]:
df_filtrado = df_filtrado.select("nombre",sf.explode("tokens").alias("words"))

{"ts": "2025-12-05 21:17:38.235", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `tokens` cannot be resolved. Did you mean one of the following? [`words`, `nombre`]. SQLSTATE: 42703", "context": {"file": "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "line": "", "fragment": "col", "errorClass": "UNRESOLVED_COLUMN.WITH_SUGGESTION"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o340.select.\n: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `tokens` cannot be resolved. Did you mean one of the following? [`words`, `nombre`]. SQLSTATE: 42703;\n'Project [nombre#25, 'explode('tokens) AS words#99]\n+- Project [nombre#25, words#91]\n   +- Generate explode(tokens#76), false, [words#91]\n      +- Project [nombre#25, contenido#57, filter(token

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `tokens` cannot be resolved. Did you mean one of the following? [`words`, `nombre`]. SQLSTATE: 42703;
'Project [nombre#25, 'explode('tokens) AS words#99]
+- Project [nombre#25, words#91]
   +- Generate explode(tokens#76), false, [words#91]
      +- Project [nombre#25, contenido#57, filter(tokens#65, lambdafunction(NOT array_contains(array(a, about, above, after, again, against, ain, all, am, an, and, any, are, aren, aren't, as, at, be, because, been, before, being, below, between, both, ... 173 more fields), lambda x_1#77), lambda x_1#77, false)) AS tokens#76]
         +- Project [nombre#25, contenido#57, split(contenido#57,  , -1) AS tokens#65]
            +- Project [nombre#25, regexp_replace(contenido#49, \s+,  , 1) AS contenido#57]
               +- Project [nombre#25, regexp_replace(contenido#41, [^a-zA-Z0-9-\s], , 1) AS contenido#49]
                  +- Project [nombre#25, lower(contenido#26) AS contenido#41]
                     +- LogicalRDD [nombre#25, contenido#26], false


In [None]:
df_filtrado.show(truncate=False)

In [37]:
docWordCount = df_filtrado.groupBy("nombre","words").agg(sf.count("words").alias("TF"))

In [38]:
df_filtrado.show(truncate=False)

                                                                                

+------+---------+
|nombre|words    |
+------+---------+
|libro1|hello    |
|libro1|worl     |
|libro1|first    |
|libro1|document |
|libro2|want     |
|libro2|start    |
|libro2|learnign |
|libro2|something|
|libro3|gonna    |
|libro3|book     |
|libro3|monsters |
|libro3|teenagers|
+------+---------+



In [27]:
docWordCount.show(5)
docWordCount.tail(5)

                                                                                

+------+--------+---+
|nombre|   words| TF|
+------+--------+---+
|libro1|   first|  1|
|libro1|document|  1|
|libro1|   hello|  1|
|libro1|    worl|  1|
|libro2|   start|  1|
+------+--------+---+
only showing top 5 rows


                                                                                

[Row(nombre='libro2', words='something', TF=1),
 Row(nombre='libro3', words='book', TF=1),
 Row(nombre='libro3', words='teenagers', TF=1),
 Row(nombre='libro3', words='monsters', TF=1),
 Row(nombre='libro3', words='gonna', TF=1)]

In [39]:
docTF = docWordCount.groupBy("words").pivot('nombre').sum('tf')
docTF = docTF.fillna(0)
docTFOrd = docTF.orderBy("words")

                                                                                

In [40]:
docTFOrd.show(100)

                                                                                

+---------+------+------+------+
|    words|libro1|libro2|libro3|
+---------+------+------+------+
|     book|     0|     0|     1|
| document|     1|     0|     0|
|    first|     1|     0|     0|
|    gonna|     0|     0|     1|
|    hello|     1|     0|     0|
| learnign|     0|     1|     0|
| monsters|     0|     0|     1|
|something|     0|     1|     0|
|    start|     0|     1|     0|
|teenagers|     0|     0|     1|
|     want|     0|     1|     0|
|     worl|     1|     0|     0|
+---------+------+------+------+



In [41]:
docCols = [x for x in docTFOrd.columns if x != 'words']

In [42]:
print(docCols)

['libro1', 'libro2', 'libro3']


In [32]:
docDF = docTFOrd.withColumn("DF",sum((sf.col(x) > 0).cast("int") for x in docCols))

PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got generator.

In [33]:
docDF = docTFOrd.withColumn("DF",sum((sf.col(x) > 0).cast("int") for x in docCols))

PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got generator.

In [34]:
docDF.show(100)

NameError: name 'docDF' is not defined

In [36]:
docDf = docTFOrd.groupBy('words')

In [37]:
docDf.show(100)

AttributeError: 'GroupedData' object has no attribute 'show'

In [38]:
docDf = docTFOrd.groupBy('words')

In [39]:
docDf.show(100)

AttributeError: 'GroupedData' object has no attribute 'show'

In [40]:
docTFOrd.groupBy('words')

GroupedData[grouping expressions: [words], value: [words: string, libro1: bigint ... 2 more fields], type: GroupBy]

In [41]:
docTFOrd.show(100)



+---------+------+------+------+
|    words|libro1|libro2|libro3|
+---------+------+------+------+
|     book|     0|     0|     1|
| document|     1|     0|     0|
|    first|     1|     0|     0|
|    gonna|     0|     0|     1|
|    hello|     1|     0|     0|
| learnign|     0|     1|     0|
| monsters|     0|     0|     1|
|something|     0|     1|     0|
|    start|     0|     1|     0|
|teenagers|     0|     0|     1|
|     want|     0|     1|     0|
|     worl|     1|     0|     0|
+---------+------+------+------+



                                                                                

In [44]:
docDF = docTFOrd.withColumn("DF", docTFOrd.groupBy('words').count())

PySparkTypeError: [NOT_COLUMN] Argument `col` should be a Column, got DataFrame.

In [45]:
docDF = docTFOrd.withColumn("DF", docTFOrd.count(lambda x: x==1))

TypeError: DataFrame.count() takes 1 positional argument but 2 were given

In [46]:
docDF = docTFOrd.withColumn("DF",sum((sf.col(x) > 0).cast("int") for x in docCols))

PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got generator.

In [43]:
from pyspark.sql.functions import col

PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got generator.

In [50]:
docDf = docTFOrd.withColumn("DF", reduce(lambda a, b: a + b, (F.col(c) for c in docCols)))

TypeError: reduce() missing 1 required positional argument: 'merge'

docTFOrd.show()

In [51]:
docTFOrd.show()

                                                                                

+---------+------+------+------+
|    words|libro1|libro2|libro3|
+---------+------+------+------+
|     book|     0|     0|     1|
| document|     1|     0|     0|
|    first|     1|     0|     0|
|    gonna|     0|     0|     1|
|    hello|     1|     0|     0|
| learnign|     0|     1|     0|
| monsters|     0|     0|     1|
|something|     0|     1|     0|
|    start|     0|     1|     0|
|teenagers|     0|     0|     1|
|     want|     0|     1|     0|
|     worl|     1|     0|     0|
+---------+------+------+------+



In [52]:
df_filtrado.show()

+------+---------+
|nombre|    words|
+------+---------+
|libro1|    hello|
|libro1|     worl|
|libro1|    first|
|libro1| document|
|libro2|     want|
|libro2|    start|
|libro2| learnign|
|libro2|something|
|libro3|    gonna|
|libro3|     book|
|libro3| monsters|
|libro3|teenagers|
+------+---------+

