In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    split, lower, explode, regexp_replace,
    lit, array_contains, regexp_extract, col, when
)

import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords


[nltk_data] Downloading package stopwords to /home/daniel/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [5]:
spark = SparkSession.builder.appName('Books_TFIDF').getOrCreate()

carpeta = './libros/*.txt'  
rdd = spark.sparkContext.wholeTextFiles(carpeta)

print("Archivos encontrados:", rdd.count())

df = rdd.toDF(['ruta', 'texto'])
df = df.withColumn('doc', regexp_extract('ruta', r'([^/]+$)', 1))
df = df.withColumn('doc', regexp_replace('doc', '\\.txt$', ''))
df = df.select('doc', 'texto')

df.show(5)


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/14 21:58:16 WARN Utils: Your hostname, DMPC, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/14 21:58:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/14 21:58:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Archivos encontrados: 153


                                                                                

+-----+--------------------+
|  doc|               texto|
+-----+--------------------+
|  100|﻿The Project Gute...|
| 1023|﻿The Project Gute...|
|10554|﻿The Project Gute...|
| 1080|﻿The Project Gute...|
|   11|﻿The Project Gute...|
+-----+--------------------+
only showing top 5 rows


In [6]:
def limpiar_texto(df, columna):
    return df.withColumn(
        columna,
        regexp_replace(columna, '[^a-zA-Z0-9\\s]', '')
    )

df_clean = limpiar_texto(df, 'texto')
df_clean.show(5)

                                                                                

+-----+--------------------+
|  doc|               texto|
+-----+--------------------+
|  100|The Project Guten...|
| 1023|The Project Guten...|
|10554|The Project Guten...|
| 1080|The Project Guten...|
|   11|The Project Guten...|
+-----+--------------------+
only showing top 5 rows


In [7]:
df_tokens = df_clean.withColumn(
    'palabras',
    split(lower(col('texto')), '\\s+')
)

df_tokens.select('doc', 'palabras').show(3, truncate=50)

                                                                                

+-----+--------------------------------------------------+
|  doc|                                          palabras|
+-----+--------------------------------------------------+
|  100|[the, project, gutenberg, ebook, of, the, compl...|
| 1023|[the, project, gutenberg, ebook, of, bleak, hou...|
|10554|[the, project, gutenberg, ebook, of, right, ho,...|
+-----+--------------------------------------------------+
only showing top 3 rows


In [8]:
stop_words = stopwords.words('english')

df_tokens_filtrado = df_tokens.withColumn(
    'palabras',
    F.filter('palabras', lambda x: ~array_contains(lit(stop_words), x))
)

df_tokens_filtrado.show(3, truncate=50)

                                                                                

+-----+--------------------------------------------------+--------------------------------------------------+
|  doc|                                             texto|                                          palabras|
+-----+--------------------------------------------------+--------------------------------------------------+
|  100|The Project Gutenberg eBook of The Complete Wor...|[project, gutenberg, ebook, complete, works, wi...|
| 1023|The Project Gutenberg eBook of Bleak House\r\n ...|[project, gutenberg, ebook, bleak, house, ebook...|
|10554|The Project Gutenberg eBook of Right Ho Jeeves\...|[project, gutenberg, ebook, right, ho, jeeves, ...|
+-----+--------------------------------------------------+--------------------------------------------------+
only showing top 3 rows


In [9]:
df_explode = (
    df_tokens_filtrado
    .withColumn('palabra', explode('palabras'))
    .select('doc', 'palabra')
    .filter(col('palabra') != '')
)

df_explode.show(20)

[Stage 6:>                                                          (0 + 1) / 1]

+---+------------+
|doc|     palabra|
+---+------------+
|100|     project|
|100|   gutenberg|
|100|       ebook|
|100|    complete|
|100|       works|
|100|     william|
|100| shakespeare|
|100|       ebook|
|100|         use|
|100|      anyone|
|100|    anywhere|
|100|      united|
|100|      states|
|100|       parts|
|100|       world|
|100|        cost|
|100|      almost|
|100|restrictions|
|100|  whatsoever|
|100|         may|
+---+------------+
only showing top 20 rows


                                                                                

In [10]:
df_word_count = df_explode.groupBy('doc', 'palabra').count()

df_doc_size = (
    df_explode.groupBy('doc').count()
    .withColumnRenamed('count', 'total_palabras')
)

df_TF = (
    df_word_count
    .join(df_doc_size, on='doc')
    .withColumn('tf', col('count') / col('total_palabras'))
    .select('doc', 'palabra', 'tf')
)

df_TF.show(20)



+---+----------+--------------------+
|doc|   palabra|                  tf|
+---+----------+--------------------+
|100|profitless|5.793619679767329E-6|
|100|  destroys|3.862413119844885E-6|
|100|     lofty|3.476171807860397E-5|
|100|  thoughts|5.272193908588269E-4|
|100|  bettring|1.931206559922442...|
|100|       sea|4.692831940611535...|
|100| antiquity|1.351844591945709...|
|100|    fleece|1.931206559922442...|
|100|    yellow|5.793619679767328E-5|
|100|   contain|3.283051151868153E-5|
|100|   compile|3.862413119844885E-6|
|100|  abundant| 7.72482623968977E-6|
|100|    spoils|2.317447871906931...|
|100|  answered|6.952343615720794E-5|
|100|   invited|1.931206559922442...|
|100|     angel|1.120099804755016...|
|100|    refuse|6.372981647744062E-5|
|100|warrantise|1.931206559922442...|
|100| adoptious|1.931206559922442...|
|100|   modesty|1.042851542358119E-4|
+---+----------+--------------------+
only showing top 20 rows


                                                                                

In [11]:
df_unique = df_explode.select('doc', 'palabra').distinct()

df_DF = (
    df_unique.groupBy('palabra').count()
    .withColumnRenamed('count', 'df')
)

df_DF.show(20)



+--------------+---+
|       palabra| df|
+--------------+---+
|           vor|  9|
|        harder| 72|
|        20253m|  1|
|           fog| 64|
|          hope|153|
|        online|152|
|         cures| 31|
|        vortex| 17|
|         trail| 44|
|        chiuse|  1|
| modernasalute|  1|
|     connected|107|
|    cineration|  1|
|achtungswrdige|  1|
|       courted| 26|
|      glafiras|  1|
|         still|147|
|       implore| 43|
|    occidental|  9|
|  bottleeither|  1|
+--------------+---+
only showing top 20 rows


                                                                                

In [12]:
N = df.select('doc').distinct().count()
print('Total de documentos:', N)

[Stage 19:>                                                         (0 + 2) / 2]

Total de documentos: 153


                                                                                

In [13]:
from pyspark.sql.functions import col

# Conteo de palabras por documento
df_word_count = df_explode.groupBy('doc', 'palabra').count()

# Total de palabras por documento
df_doc_size = (
    df_explode
    .groupBy('doc')
    .count()
    .withColumnRenamed('count', 'total_palabras')
)

# TF = frecuencia / total de palabras del documento
df_TF = (
    df_word_count
    .join(df_doc_size, on='doc')
    .withColumn('tf', col('count') / col('total_palabras'))
    .select('doc', 'palabra', 'tf')
)

df_TF.show(20)





+---+----------+--------------------+
|doc|   palabra|                  tf|
+---+----------+--------------------+
|100|profitless|5.793619679767329E-6|
|100|  destroys|3.862413119844885E-6|
|100|     lofty|3.476171807860397E-5|
|100|  thoughts|5.272193908588269E-4|
|100|  bettring|1.931206559922442...|
|100|       sea|4.692831940611535...|
|100| antiquity|1.351844591945709...|
|100|    fleece|1.931206559922442...|
|100|    yellow|5.793619679767328E-5|
|100|   contain|3.283051151868153E-5|
|100|   compile|3.862413119844885E-6|
|100|  abundant| 7.72482623968977E-6|
|100|    spoils|2.317447871906931...|
|100|  answered|6.952343615720794E-5|
|100|   invited|1.931206559922442...|
|100|     angel|1.120099804755016...|
|100|    refuse|6.372981647744062E-5|
|100|warrantise|1.931206559922442...|
|100| adoptious|1.931206559922442...|
|100|   modesty|1.042851542358119E-4|
+---+----------+--------------------+
only showing top 20 rows


                                                                                

In [15]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit

# Número total de documentos
N = df.select('doc').distinct().count()
print("Total documentos:", N)

df_TFIDF = (
    df_TF
    .join(df_DF, on='palabra')
    .withColumn(
        'tfidf',
        col('tf') * F.log10(lit(1) + (lit(N) / col('df')))
    )
    .select('doc', 'palabra', 'tfidf')
)

df_TFIDF.show(20)


ConnectionRefusedError: [Errno 111] Connection refused