In [5]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("MiApp")
    .master("local[*]")
    .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("WARN")

print("Spark version:", spark.version)
print("Default parallelism:", sc.defaultParallelism)

r = sc.parallelize(range(1, 11))
print("Sum test:", r.sum())  # debe imprimir 55


Spark version: 3.5.1
Default parallelism: 8
Sum test: 55


25/09/25 16:35:26 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


## **Exploración Básica**

In [6]:
working_dir = "/mnt/c/Users/garci/OneDrive/Documentos/Tercer semestre U/IALab4/lab7DS/"
import os
print("Existe constitution.txt? ->", os.path.exists(os.path.join(working_dir, "constitution.txt")))
rdd = sc.textFile(working_dir + "constitution.txt")
rdd.take(3)


Existe constitution.txt? -> True


['We the People of the United States, in Order to form a more perfect ',
 'Union, establish Justice, insure domestic Tranquility, provide for the ',
 'common defence, promote the general Welfare, and secure the Blessings of ']

## **¿Cuántas líneas hay en el documento?**

In [9]:
num_lineas = rdd.count()
print("Número de líneas:", num_lineas)

Número de líneas: 649


                                                                                

## **¿Cómo luce la separación por palabras?**

In [10]:
splitted_lines = rdd.map(lambda line: line.split(' '))
splitted_lines.take(3)  # notarás que cada elemento es una LISTA de palabras


[['We',
  'the',
  'People',
  'of',
  'the',
  'United',
  'States,',
  'in',
  'Order',
  'to',
  'form',
  'a',
  'more',
  'perfect',
  ''],
 ['Union,',
  'establish',
  'Justice,',
  'insure',
  'domestic',
  'Tranquility,',
  'provide',
  'for',
  'the',
  ''],
 ['common',
  'defence,',
  'promote',
  'the',
  'general',
  'Welfare,',
  'and',
  'secure',
  'the',
  'Blessings',
  'of',
  '']]

# **Word Count**

## **Convierte líneas → palabras (la forma correcta)**

In [11]:
# A) Enfoque ingenuo (para observar el problema)
words_rdd_bad = rdd.flatMap(lambda line: line.split(' '))
print("Ejemplo ingenuo:", words_rdd_bad.take(8))
print("Conteo ingenuo:", words_rdd_bad.count())


Ejemplo ingenuo: ['We', 'the', 'People', 'of', 'the', 'United', 'States,', 'in']
Conteo ingenuo: 8435


In [12]:
# B) Enfoque correcto: strip() antes de split, y filtrar vacíos
words_rdd = rdd.flatMap(lambda line: line.strip().split(' ')) \
               .filter(lambda w: w != '')

print("Ejemplo corregido:", words_rdd.take(8))
print("Conteo corregido:", words_rdd.count())


Ejemplo corregido: ['We', 'the', 'People', 'of', 'the', 'United', 'States,', 'in']
Conteo corregido: 7623


## **Palabra más larga**

In [13]:
longest = words_rdd.reduce(lambda a, b: a if len(a) > len(b) else b)
longest, len(longest)


('Representatives,', 16)

## **Dejar solo caracteres alfanuméricos**

In [14]:
import re

# Elimina puntuación de los extremos y filtra no alfanuméricos
# 1) Normaliza: recorta, separa y quita vacíos
# 2) Limpia: deja solo letras/dígitos (quita puntuación de inicios/finales)
# 3) Filtra: descarta vacíos que queden

clean_words_rdd = (
    rdd.flatMap(lambda line: line.strip().split(' '))
       .map(lambda w: re.sub(r'^\W+|\W+$', '', w))  # quita signos al inicio/fin
       .filter(lambda w: w != '')
       .filter(lambda w: w.isalnum())               # estrictamente alfanumérico
)

print("Ejemplos limpios:", clean_words_rdd.take(12))
print("Conteo limpio:", clean_words_rdd.count())


Ejemplos limpios: ['We', 'the', 'People', 'of', 'the', 'United', 'States', 'in', 'Order', 'to', 'form', 'a']
Conteo limpio: 7589


## **Recalculo de la palabra más larga**

In [15]:
longest_clean = clean_words_rdd.reduce(lambda a, b: a if len(a) > len(b) else b)
longest_clean, len(longest_clean)


                                                                                

('constitutionally', 16)

# **RDDs Key-Value**

## **Convierte palabras → pares (word, 1)**

In [16]:
keyval_rdd = clean_words_rdd.map(lambda w: (w, 1))
keyval_rdd.take(5)

[('We', 1), ('the', 1), ('People', 1), ('of', 1), ('the', 1)]

## **Cuenta por palabra con reduceByKey**

In [17]:
wordcount = keyval_rdd.reduceByKey(lambda a, b: a + b)
wordcount.take(5)

                                                                                

[('We', 2), ('of', 494), ('United', 85), ('States', 125), ('Order', 2)]

## **Ordena por conteo (desc) y toma Top-5**

In [18]:
sorted_counts = (
    wordcount
      .map(lambda x: (x[1], x[0]))   # (count, word)
      .sortByKey(ascending=False)    # mayor a menor
)
top5 = sorted_counts.take(5)
top5

[(662, 'the'), (494, 'of'), (306, 'shall'), (258, 'and'), (184, 'to')]

## **Mejora: normaliza a minúsculas y remueve stop words**

In [20]:
import re

norm_words_rdd = (
    rdd.flatMap(lambda line: line.strip().split(' '))
       .map(lambda w: re.sub(r'^\W+|\W+$', '', w))
       .filter(lambda w: w != '')
       .filter(lambda w: w.isalnum())
       .map(lambda w: w.lower())
)

stopwords = set("""
a an and are as at be by for from has have he her his i in is it its of on or that the their there they this to was were will with you your we our shall
""".split())

no_stop_rdd = norm_words_rdd.filter(lambda w: w not in stopwords)

wordcount_ns = no_stop_rdd.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)
sorted_ns = wordcount_ns.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)
top5_no_stop = sorted_ns.take(5)
top5_no_stop

[(129, 'states'),
 (110, 'president'),
 (85, 'united'),
 (79, 'state'),
 (79, 'any')]